package com.kafka.cn.producer;

import com.kafka.cn.util.CustomProperties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.ArrayList;
import java.util.Properties;

/**
 * @author: yangShen
 * @Description:
 * @Date: 2020/4/1 10:13
 */
public class UseInterceptorProducer {
    public static void main(String[] args) throws InterruptedException {

        Properties properties = CustomProperties.getProducerProperties();

        //添加拦截器
        ArrayList<String> interceptors = new ArrayList<>();
        //添加顺序，根据业务来定
        interceptors.add("com.kafka.cn.interceptor.TimeInterceptor");
        interceptors.add("com.kafka.cn.interceptor.CountInterceptor");
        properties.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,interceptors);

        // 9.创建生产者对象
        KafkaProducer<String, String> producer = new KafkaProducer<>(properties);


        try {
            //10.发送数据：鼠标放在编译错误处 ctrl+p
            for (int i = 0; i < 10; i++) {
                //1.异步发送消息，常用
                producer.send(new ProducerRecord<String, String>("first","atguigu", "atguigu---"+i));
            }
        }catch (Exception e){
            e.printStackTrace();
        }finally {
            // 11.关闭资源，如果不关闭，等待时间还没有超过1ms，此时消费者会接收不到数据
            //或者
            //Thread.sleep(100);
            //此处的close()方法，会调用拦截器的close()方法
            producer.close();
        }



    }
}
